#![cfg(all(feature = "test-utils", feature = "failpoints"))]

use etl::destination::memory::MemoryDestination;
use etl::error::ErrorKind;
use etl::failpoints::{
    START_TABLE_SYNC_BEFORE_DATA_SYNC_SLOT_CREATION, START_TABLE_SYNC_DURING_DATA_SYNC,
};
use etl::state::table::{RetryPolicy, TableReplicationPhase, TableReplicationPhaseType};
use etl::test_utils::database::spawn_source_database;
use etl::test_utils::notify::NotifyingStore;
use etl::test_utils::pipeline::create_pipeline;
use etl::test_utils::test_destination_wrapper::TestDestinationWrapper;
use etl::test_utils::test_schema::{TableSelection, insert_users_data, setup_test_database_schema};
use etl::types::PipelineId;
use etl_telemetry::tracing::init_test_tracing;
use fail::FailScenario;
use rand::random;

#[tokio::test(flavor = "multi_thread")]
async fn table_copy_fails_after_data_sync_threw_an_error_with_no_retry() {
    let _scenario = FailScenario::setup();
    fail::cfg(
        START_TABLE_SYNC_BEFORE_DATA_SYNC_SLOT_CREATION,
        "1*return(no_retry)",
    )
    .unwrap();

    init_test_tracing();

    let mut database = spawn_source_database().await;
    let database_schema = setup_test_database_schema(&database, TableSelection::UsersOnly).await;

    // Insert initial test data.
    let rows_inserted = 10;
    insert_users_data(
        &mut database,
        &database_schema.users_schema().name,
        1..=rows_inserted,
    )
    .await;

    let store = NotifyingStore::new();
    let destination = TestDestinationWrapper::wrap(MemoryDestination::new());

    // We start the pipeline from scratch.
    let pipeline_id: PipelineId = random();
    let mut pipeline = create_pipeline(
        &database.config,
        pipeline_id,
        database_schema.publication_name(),
        store.clone(),
        destination.clone(),
    );

    // Register notifications for table sync phases.
    let users_state_notify = store
        .notify_on_table_state_type(
            database_schema.users_schema().id,
            TableReplicationPhaseType::Errored,
        )
        .await;

    pipeline.start().await.unwrap();

    users_state_notify.notified().await;

    // We expect to have a no retry error which is generated by the failpoint.
    let err = pipeline.shutdown_and_wait().await.err().unwrap();
    assert_eq!(err.kinds().len(), 1);
    assert_eq!(err.kinds()[0], ErrorKind::WithNoRetry);

    // Verify no data is there.
    let table_rows = destination.get_table_rows().await;
    assert!(table_rows.is_empty());

    // Verify table schemas were correctly stored.
    let table_schemas = store.get_table_schemas().await;
    assert!(table_schemas.is_empty());
}

#[tokio::test(flavor = "multi_thread")]
async fn table_copy_fails_after_timed_retry_exceeded_max_attempts() {
    let _scenario = FailScenario::setup();
    // Since we have table_error_retry_max_attempts: 2, we want to fail 3 times, so that on the 3rd
    // time, the system switches to manual retry.
    fail::cfg(
        START_TABLE_SYNC_BEFORE_DATA_SYNC_SLOT_CREATION,
        "3*return(timed_retry)",
    )
    .unwrap();

    init_test_tracing();

    let mut database = spawn_source_database().await;
    let database_schema = setup_test_database_schema(&database, TableSelection::UsersOnly).await;

    // Insert initial test data.
    let rows_inserted = 10;
    insert_users_data(
        &mut database,
        &database_schema.users_schema().name,
        1..=rows_inserted,
    )
    .await;

    let store = NotifyingStore::new();
    let destination = TestDestinationWrapper::wrap(MemoryDestination::new());

    // We start the pipeline from scratch.
    let pipeline_id: PipelineId = random();
    let mut pipeline = create_pipeline(
        &database.config,
        pipeline_id,
        database_schema.publication_name(),
        store.clone(),
        destination.clone(),
    );

    // Register notifications for waiting on the manual retry which is expected to be flipped by the
    // max attempts handling.
    let users_state_notify = store
        .notify_on_table_state(database_schema.users_schema().id, |phase| {
            matches!(
                phase,
                TableReplicationPhase::Errored {
                    retry_policy: RetryPolicy::ManualRetry,
                    ..
                }
            )
        })
        .await;

    pipeline.start().await.unwrap();

    users_state_notify.notified().await;

    // We expect to still have the timed retry kind since this is the kind of error that we triggered.
    let err = pipeline.shutdown_and_wait().await.err().unwrap();
    assert_eq!(err.kinds().len(), 1);
    assert_eq!(err.kinds()[0], ErrorKind::WithTimedRetry);

    // Verify no data is there.
    let table_rows = destination.get_table_rows().await;
    assert!(table_rows.is_empty());

    // Verify table schemas were correctly stored.
    let table_schemas = store.get_table_schemas().await;
    assert!(table_schemas.is_empty());
}

#[tokio::test(flavor = "multi_thread")]
async fn table_copy_is_consistent_after_data_sync_threw_an_error_with_timed_retry() {
    let _scenario = FailScenario::setup();
    fail::cfg(
        START_TABLE_SYNC_BEFORE_DATA_SYNC_SLOT_CREATION,
        "1*return(timed_retry)",
    )
    .unwrap();

    init_test_tracing();

    let mut database = spawn_source_database().await;
    let database_schema = setup_test_database_schema(&database, TableSelection::UsersOnly).await;

    // Insert initial test data.
    let rows_inserted = 10;
    insert_users_data(
        &mut database,
        &database_schema.users_schema().name,
        1..=rows_inserted,
    )
    .await;

    let store = NotifyingStore::new();
    let destination = TestDestinationWrapper::wrap(MemoryDestination::new());

    // We start the pipeline from scratch.
    let pipeline_id: PipelineId = random();
    let mut pipeline = create_pipeline(
        &database.config,
        pipeline_id,
        database_schema.publication_name(),
        store.clone(),
        destination.clone(),
    );

    // We register the interest in waiting for both table syncs to have started.
    let users_state_notify = store
        .notify_on_table_state_type(
            database_schema.users_schema().id,
            TableReplicationPhaseType::SyncDone,
        )
        .await;

    pipeline.start().await.unwrap();

    users_state_notify.notified().await;

    // We expect no errors, since the same table sync worker task is retried.
    pipeline.shutdown_and_wait().await.unwrap();

    // Verify copied data.
    let table_rows = destination.get_table_rows().await;
    let users_table_rows = table_rows.get(&database_schema.users_schema().id).unwrap();
    assert_eq!(users_table_rows.len(), rows_inserted);

    // Verify table schemas were correctly stored.
    let table_schemas = store.get_table_schemas().await;
    assert_eq!(table_schemas.len(), 1);
    assert_eq!(
        *table_schemas
            .get(&database_schema.users_schema().id)
            .unwrap(),
        database_schema.users_schema()
    );
}

#[tokio::test(flavor = "multi_thread")]
async fn table_copy_is_consistent_during_data_sync_threw_an_error_with_timed_retry() {
    let _scenario = FailScenario::setup();
    fail::cfg(START_TABLE_SYNC_DURING_DATA_SYNC, "1*return(timed_retry)").unwrap();

    init_test_tracing();

    let mut database = spawn_source_database().await;
    let database_schema = setup_test_database_schema(&database, TableSelection::UsersOnly).await;

    // Insert initial test data.
    let rows_inserted = 10;
    insert_users_data(
        &mut database,
        &database_schema.users_schema().name,
        1..=rows_inserted,
    )
    .await;

    let store = NotifyingStore::new();
    let destination = TestDestinationWrapper::wrap(MemoryDestination::new());

    // We start the pipeline from scratch.
    let pipeline_id: PipelineId = random();
    let mut pipeline = create_pipeline(
        &database.config,
        pipeline_id,
        database_schema.publication_name(),
        store.clone(),
        destination.clone(),
    );

    // We register the interest in waiting for both table syncs to have started.
    let users_state_notify = store
        .notify_on_table_state_type(
            database_schema.users_schema().id,
            TableReplicationPhaseType::SyncDone,
        )
        .await;

    pipeline.start().await.unwrap();

    users_state_notify.notified().await;

    // We expect no errors, since the same table sync worker task is retried.
    pipeline.shutdown_and_wait().await.unwrap();

    // Verify copied data.
    let table_rows = destination.get_table_rows().await;
    let users_table_rows = table_rows.get(&database_schema.users_schema().id).unwrap();
    assert_eq!(users_table_rows.len(), rows_inserted);

    // Verify table schemas were correctly stored.
    let table_schemas = store.get_table_schemas().await;
    assert_eq!(table_schemas.len(), 1);
    assert_eq!(
        *table_schemas
            .get(&database_schema.users_schema().id)
            .unwrap(),
        database_schema.users_schema()
    );
}
